package com.chukun.flink.stream.action.fraud.rules.source;


import com.chukun.flink.stream.action.fraud.config.Config;
import com.chukun.flink.stream.action.fraud.config.Parameters;
import java.io.IOException;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import com.chukun.flink.stream.action.fraud.rules.functions.RuleDeserializer;
import com.chukun.flink.stream.action.fraud.rules.functions.RulesStaticJsonGenerator;
import com.chukun.flink.stream.action.fraud.rules.model.Rule;
import com.chukun.flink.stream.action.fraud.rules.utils.KafkaUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

/**
 * 规则数据流
 * @author chukun
 */
public class RulesSource {

  private static final int RULES_STREAM_PARALLELISM = 1;

  /**
   * 创建规则的数据流
   * @param config
   * @return
   * @throws IOException
   */
  public static SourceFunction<String> createRulesSource(Config config) throws IOException {

    String sourceType = config.get(Parameters.RULES_SOURCE);
    Type rulesSourceType = Type.valueOf(sourceType.toUpperCase());

    switch (rulesSourceType) {
      case KAFKA:
        Properties kafkaProps = KafkaUtils.initConsumerProperties(config);
        String rulesTopic = config.get(Parameters.RULES_TOPIC);
        FlinkKafkaConsumer011<String> kafkaConsumer =
            new FlinkKafkaConsumer011<>(rulesTopic, new SimpleStringSchema(), kafkaProps);
        kafkaConsumer.setStartFromLatest();
        return kafkaConsumer;
      case PUBSUB:
        return PubSubSource.newBuilder()
            .withDeserializationSchema(new SimpleStringSchema())
            .withProjectName(config.get(Parameters.GCP_PROJECT_NAME))
            .withSubscriptionName(config.get(Parameters.GCP_PUBSUB_RULES_SUBSCRIPTION))
            .build();
      case SOCKET:
        return new SocketTextStreamFunction("localhost", config.get(Parameters.SOCKET_PORT), "\n", -1);
      case STATIC:
        return new RulesStaticJsonGenerator();
      default:
        throw new IllegalArgumentException(
            "Source \""
                + rulesSourceType
                + "\" unknown. Known values are:"
                + Arrays.toString(Type.values()));
    }
  }

  public static DataStream<Rule> stringsStreamToRules(DataStream<String> ruleStrings) {
    return ruleStrings
        .flatMap(new RuleDeserializer())
        .name("Rule Deserialization")
        .setParallelism(RULES_STREAM_PARALLELISM)
        .assignTimestampsAndWatermarks(
            new BoundedOutOfOrdernessTimestampExtractor<Rule>(Time.of(0, TimeUnit.MILLISECONDS)) {
              @Override
              public long extractTimestamp(Rule element) {
                // Prevents connected data+update stream watermark stalling.
                return Long.MAX_VALUE;
              }
            });
  }

  public enum Type {
    KAFKA("Rules Source (Kafka)"),
    PUBSUB("Rules Source (Pub/Sub)"),
    SOCKET("Rules Source (Socket)"),
    STATIC("Rules Source (Static Ruleset)");

    private String name;

    Type(String name) {
      this.name = name;
    }

    public String getName() {
      return name;
    }
  }
}
